Skip to content

Conversation

gustavodemorais
Copy link
Contributor

What is the purpose of the change

The planner currently considers the union of both the unique and upsert keys from the left and from the right to be a valid resulting upsert key. That's true for inner joins but for left/right/full joins that leads to a resulting unique key that contains columns that can be null, which is not valid.

Brief change log

  • Check for null generating columns when creating superset of unique keys

Verifying this change

  • Adjusted existing tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 8, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

assertEquals(
toBitSet(Array(1), Array(1, 5), Array(1, 5, 6)),
mq.getUpsertKeys(logicalLeftJoinOnUniqueKeys).toSet)
assertEquals(toBitSet(Array(1)), mq.getUpsertKeys(logicalLeftJoinOnUniqueKeys).toSet)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am i missing something, I do not see tests for nulls on each side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above is the test: 5, and 6 were columns coming from the right side of the join. So they can't be included in unique keys adn that's why "Array(1, 5), Array(1, 5, 6)" were remove as possible unique keys.

The same for all the other tests that were updated, they test the new behaviour

@gustavodemorais
Copy link
Contributor Author

CI is failling on unrelated e2e tests issue happening across PRs

@xuyangzhong
Copy link
Contributor

Hi, @gustavodemorais. I'm wondering why we can't have null columns as part of unique keys? From what I see in the SQL standard, it seems that unique keys can contain null columns.
Here is the specific description from the SQL 2016 standard:

A unique constraint that does not include a <without overlap specification> on a table T is satisfied if and only
if there do not exist two rows R1 and R2 of T such that R1 and R2 have the same non-null values in the unique
columns. If a unique constraint UC on a table T includes a <without overlap specification>WOS, then let ATPN
be the <application time period name> contained in WOS. UC is satisfied if and only if there do not exist two
rows R1 and R2 of T such that R1 and R2 have the same non-null values in the unique columns and the ATPN
period values of R1 and R2 overlap. In addition, if the unique constraint was defined with PRIMARY KEY,
then it requires that none of the values in the specified column or columns be a null value.

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Oct 9, 2025
@gustavodemorais
Copy link
Contributor Author

gustavodemorais commented Oct 20, 2025

Hi, @gustavodemorais. I'm wondering why we can't have null columns as part of unique keys? From what I see in the SQL standard, it seems that unique keys can contain null columns. Here is the specific description from the SQL 2016 standard:

A unique constraint that does not include a <without overlap specification> on a table T is satisfied if and only
if there do not exist two rows R1 and R2 of T such that R1 and R2 have the same non-null values in the unique
columns. If a unique constraint UC on a table T includes a <without overlap specification>WOS, then let ATPN
be the <application time period name> contained in WOS. UC is satisfied if and only if there do not exist two
rows R1 and R2 of T such that R1 and R2 have the same non-null values in the unique columns and the ATPN
period values of R1 and R2 overlap. In addition, if the unique constraint was defined with PRIMARY KEY,
then it requires that none of the values in the specified column or columns be a null value.

Hey @xuyangzhong, thanks for the comment. I was at Flink Forward and just reading your reply now.

That's a good catch. The issue is that we are using the unique keys as upsert keys and in my understand we can't allow upsert keys to contain null values and that's what I was trying to fix here. It leads to a runtime error since a sink expects the upsert key not to contain a null value which can happen in a case of a left join. https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala#L352

We could also have additional logic in getJoinUpsertKeys to remove such keys. Do you have other suggestions?

@xuyangzhong
Copy link
Contributor

@gustavodemorais IIUC, currently there is no explicit handling of null values for the upsert key from the planning stage (where it originates from the unique key, which is not restricted from containing null values) to runtime (where the operators do not include any special treatment for columns that may contain null values in the upsert key). In other words, there are no restrictions preventing the upsert key from containing null values. I’m curious about the issue you mentioned. Could you explain it in more detail?

It leads to a runtime error since a sink expects the upsert key not to contain a null value which can happen in a case of a left join

@gustavodemorais
Copy link
Contributor Author

I’m curious about the issue you mentioned. Could you explain it in more detail?

Sure. If we create two tables and apply a left join, it might happen that the upsert key contains nullable columns, like in the examples in the tests. We can then create a third table with a primary key and submit an insert into with this upsert key. The planner will accept the statement and as soon as we produce one row with a null value, the job fails.

Here are simple repro steps

CREATE TABLE `users` (
  `user_id` INT NOT NULL,
  `other_data` STRING,
  CONSTRAINT `PRIMARY` PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'key.format' = 'json',
  'properties.bootstrap.servers' = 'localhost:9092',
  'topic' = 'users',
  'value.format' = 'json'
);

CREATE TABLE `orders_with_composite_key` (
  `order_id` BIGINT NOT NULL,
  `user_id` INT NOT NULL,
  `item_name` STRING,
  CONSTRAINT `PRIMARY` PRIMARY KEY (`order_id`, `user_id`) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'key.format' = 'json',
  'properties.bootstrap.servers' = 'localhost:9092',
  'topic' = 'orders',
  'value.format' = 'json'
);

CREATE TABLE test_join_upsert (
  user_id INT NOT NULL,
  order_id BIGINT NOT NULL,
  user_id0 INT NOT NULL,
  other_data STRING,
  PRIMARY KEY (user_id, order_id, user_id0) NOT ENFORCED -- this matches the nullable upsert key calculated by the planner
)
WITH (
  'connector' = 'upsert-kafka',
  'key.format' = 'json',
  'properties.bootstrap.servers' = 'localhost:9092',
  'topic' = 'users_orders_upsert',
  'value.format' = 'json'
)

INSERT INTO test_join_upsert
SELECT
  o.user_id,
  o.order_id,
  u.user_id AS user_id0,
  u.other_data
FROM `users` AS u
  LEFT JOIN `orders_with_composite_key` AS o
  ON o.user_id = u.user_id;

-- this causes the job to fail
INSERT INTO users (VALUES (1, '1'));

Two things happen

  • Running explain on the insert above doesn't show upsertMaterialize=[true] because the primary key matches the upsert key calculated by the planner (which I don't think is correct)
  • It fails at runtime after inserting an user with
Caused by: org.apache.flink.table.runtime.operators.sink.constraint.EnforcerException: Column 'user_id' is NOT NULL, however, a null value is being written into it. You can set job configuration 'table.exec.sink.not-null-enforcer'='DROP' to suppress this exception and drop such records silently.
	at org.apache.flink.table.runtime.operators.sink.constraint.NotNullConstraint.enforce(NotNullConstraint.java:55)
	at org.apache.flink.table.runtime.operators.sink.constraint.ConstraintEnforcerExecutor.enforce(ConstraintEnforcerExecutor.java:445)
	at org.apache.flink.table.runtime.operators.sink.constraint.ConstraintEnforcer.processElement(ConstraintEnforcer.java:66)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
	at StreamExecCalc$6.processElement(Unknown Source)

Ideally we would fail this during planning. Now, I still think that by definition upsert keys cannot contain null values - do you agree with that? At the same time, while reproducing this I realized the runtime issue might not be solved by addressing nullable upsert keys. What do you think?

@xuyangzhong
Copy link
Contributor

xuyangzhong commented Oct 22, 2025

@gustavodemorais I agree with your point that we should ideally find this unexpected behavior during the planning phase. In fact, I believe there's a fundamental issue with how this sql is written. When using a LEFT JOIN, one must be aware that fields from the right table may be null. If the sink table defines some fields from the right table as part of the combined primary key—which must not contain null values—then it's reasonable to encounter an error stating "null attempted to be written to not null."

Moreover, this issue is not related to sink upsert materialization, which is intended to address out-of-order problems. Under this sql, out-of-order issues aren't present, so upsertMaterialize=[true] won't be shown. In other words, even if upsert materialization were utilized, it wouldn't resolve the potential for null values being written to the primary key.

I ran the SQL on MySQL, and I received a similar error regarding null being written to the primary key.

CREATE TABLE T1 (
  a1 int primary key,
  b1 int
);

CREATE TABLE T2 (
  a2 int primary key,
  b2 int
);

CREATE TABLE T3 (
  a1 int,
  b1 int,
  a2 int,
  b2 int,
  primary key (a1, a2)
);

insert into T1 values(1, 1);
insert into T1 values(2, 2);
insert into T2 values(1, 1);

insert into T3 select * from T1 left join T2 on a1 = a2;
ERROR 1048 (23000) at line 23: Column 'a2' cannot be null

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants